import sys

from kafka import KafkaProducer
import json
from dbse.mysql.db_mysql_insert import getDataBase

# 创建生产者
producer = KafkaProducer(
    # bootstrap_servers=['39.98.82.109:29092','39.98.82.109:39092'],  # Kafka 服务器地址
    bootstrap_servers=['192.168.0.110:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # 序列化器
)

# 发送消息
try:
    pProfile = sys.argv[1:][0]
    # 查询任务
    db = getDataBase(pProfile)

    query_sql = 'select * from stock_all'
    cursor = db.cursor()
    cursor.execute(query_sql)
    stockAlls = cursor.fetchall()
    for stock in stockAlls:
        print('success')
        msg = {'id':stock[0], 'stockCode': stock[1] , 'stockName':stock[3], 'jys' : stock[4].lower()}
        producer.send('dfcf_stock_topic', msg)

    # 等待所有消息发送完成
    producer.flush()
    print("====消息发送成功====")

except Exception as e:
    print(f"发送消息时出错: {e}")

finally:
    # 关闭生产者
    producer.close()

